Parallel Computing with Python

Rodrigo Nemmen, IAG USP

This IPython notebook illustrates a few simple ways of doing parallel computing.

Practical examples included:

  1. Parallel function mapping to a list of arguments (multiprocessing module)
  2. Parallel execution of array function (scatter/gather) + parallel execution of scripts
  3. Easy parallel Monte Carlo (parallel magics)

In [5]:
%pylab inline


Populating the interactive namespace from numpy and matplotlib

1. Mapping a model to a grid of parameters

Uses the multiprocessing module that comes by default with python, i.e. method independent of IPython.

Idea: you have a function $f(\mathbf{x},\mathbf{y})$ of two parameters (e.g., $f$ may represent your model) stored in the arrays $(\mathbf{x},\mathbf{y})$. Given the arrays $\mathbf{x}$ and $\mathbf{y}$, you want to compute the values of $f(\mathbf{x},\mathbf{y})$. Let's assume for simplicity that there is no dependence on the neighbours. This is an embarassingly parallel problem.


In [6]:
import multiprocessing

Time wasting function that depends on two parameters. Here, I generate 1E5 random numbers based on the normal distribution and then sum them. The two parameters are $\mu$ and $\sigma$.


In [7]:
import scipy

def f(z):
    x=z[1]*scipy.random.standard_normal(100000)+z[0]
    return x.sum()

Arrays of input parameters. You could easily modify this to take as input a matrix, not two arrays.


In [8]:
n=3000
X=numpy.linspace(-1,1,n) # mean
Y=numpy.linspace(0.1,1,n) # std. dev.

In [9]:
# creates list of arguments [Xi, Yi]
pargs=[]	# this is a list of lists!
for i in range(X.size):
	pargs.append([X[i],Y[i]])

Parallel execution. Check out all the cores being used with a tool like htop.


In [10]:
ncores=multiprocessing.cpu_count() # number of cores
pool = multiprocessing.Pool(processes=ncores) # initializes parallel engine

In [11]:
%%time 
t=pool.map(f, pargs)	# parallel function map


CPU times: user 53.1 ms, sys: 4.03 ms, total: 57.1 ms
Wall time: 4.37 s

In [12]:
pool.close()	# close the parallel engine

Serial execution


In [13]:
%time t=map(f, pargs)


CPU times: user 10.8 s, sys: 38.9 ms, total: 10.8 s
Wall time: 10.9 s

If you want to convert the list to an array use y=array(t). Also note that there is a similar map method for ipyparallel.

2. Parallel execution of array function

Uses ipyparallel. Consider a function $f(x)$ which takes an array $x$ containing the grid of input parameters. We want to split the function calls ("split the array") to the different cores in our machine:

Make sure you start the parallel engines

Alternatively, you can start the engines from the command-line:

$ ipcluster start -n 4

Our time-waster function $f(x)$ that can be applied to an array of integers


In [14]:
# test if n is prime
def isprime(n):
    for i in range(3, n):
        if n % i == 0:
            return False
    return True

# tests each element of an array if it is prime
def f(x):
    return map(isprime,x)

Generates big array (10k elements) of random integers between 0 and 100000


In [15]:
x = scipy.random.randint(0,100000, (10000,))

Serial execution


In [16]:
%time y=f(x)


CPU times: user 24.5 s, sys: 217 ms, total: 24.7 s
Wall time: 25 s

Now explain how IPython parallel works (here I show a slide). See documentation at the end of the notebook for details.


In [17]:
import ipyparallel
client = ipyparallel.Client()

We are going to use the direct view, which means that commands always run on all nodes. This as opposed to a balanced view, which asynchronously executes code on nodes which are idle. In addition, we are going to turn blocking on. This means that jobs will block further execution until all nodes have finished.


In [18]:
direct = client[:]
direct.block = True

Splits the input array $x$ between the cores


In [19]:
direct.scatter('x',x)

Verify that the array was indeed divided equally


In [20]:
direct['x.size']


Out[20]:
[2500, 2500, 2500, 2500]

In [21]:
direct['x']


Out[21]:
[array([94305, 72839, 17104, ..., 29346, 73755, 29269]),
 array([31625, 37515, 37053, ..., 76381, 32938, 13199]),
 array([44846, 30440, 38205, ..., 83728,  5019, 84130]),
 array([29578, 88280, 80813, ..., 32620, 52857, 27595])]

Let's try to apply the function in each different core


In [22]:
%%px
y=f(x)


[0:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-c679c6b49964> in <module>()
----> 1 y=f(x)
NameError: name 'f' is not defined

[1:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-c679c6b49964> in <module>()
----> 1 y=f(x)
NameError: name 'f' is not defined

[2:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-c679c6b49964> in <module>()
----> 1 y=f(x)
NameError: name 'f' is not defined

[3:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-c679c6b49964> in <module>()
----> 1 y=f(x)
NameError: name 'f' is not defined

Why the errors above? Because each core does not see the local engine. They work as separate machines and you have to load all variables and modules in each engine. That's easy.


In [23]:
%%file myscript.py
# test if n is prime
def isprime(n):
    for i in range(3, n):
        if n % i == 0:
            return False
    return True

# tests each element of an array if it is prime
def f(x):
    return map(isprime,x)


Overwriting myscript.py

Execute code which defines the methods on the different engines


In [24]:
direct.run("myscript.py")


Out[24]:
<AsyncResult: finished>

Now compute the "model grid" correctly


In [26]:
%%time
%px y=f(x)


CPU times: user 42.7 ms, sys: 8.18 ms, total: 50.8 ms
Wall time: 15.8 s

Alternatively to the command above, you could use

direct.apply(f,x)

or

direct.execute('y=f(x)')

Now we have the separate arrays $y$ containing the results on each engine. How to get it back to the local engine?


In [27]:
%%px 
import numpy
numpy.size(y)


Out[0:4]: 2500
Out[1:4]: 2500
Out[2:4]: 2500
Out[3:4]: 2500

In [28]:
y=direct.gather('y')

We have the array magically reassembled back in the local engine. :)

3. Easy parallel Monte Carlo

Suppose you need to do 100k Monte Carlo simulations. Wouldn't it be great if you could easily split them among your (hopefully many) cores?

In this example, I will perform 100k realizations of a 300x300 array of random floats.


In [29]:
# number of desired random sets
nboot=100000

# number of sets that will be computed by each engine
n=nboot/size(client.ids)

Passes variables to the engines


In [30]:
direct.push(dict(n=n))

with direct.sync_imports():
    import scipy


importing scipy on engine(s)

Now everything below is executed in parallel! (IPython magic)


In [31]:
%%time
%%px

for i in range(n):
    x = scipy.random.random((300,300)) # 300x300 array of floats (values in the range [0,1)   )


CPU times: user 110 ms, sys: 19.9 ms, total: 130 ms
Wall time: 1min 3s

For comparison, how long does it take to do the same simulation in serial mode?


In [32]:
%%time
for i in range(nboot):
    x = scipy.random.random((300,300)) # 100x100 array of floats (values in the range [0,1)   )


CPU times: user 1min 42s, sys: 551 ms, total: 1min 42s
Wall time: 1min 43s

Useful reference

IPython video tutorials

Parallel computing

General

Python

Executing general (non-python) jobs in parallel


In [ ]: